package org.apache.rocketmq.proxy.service.relay;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.transaction.TransactionService;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;

import java.util.concurrent.CompletableFuture;

/**
 * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
 * @author：陈清风扬，个人微信号：chenqingfengyangjj。
 * @date:2025/6/15
 * @方法描述：该类的对象就是为Broker节点转发消息的
 */
public class LocalProxyRelayService extends AbstractProxyRelayService {

    private final BrokerController brokerController;

    public LocalProxyRelayService(BrokerController brokerController, TransactionService transactionService) {
        super(transactionService);
        this.brokerController = brokerController;
    }


    /**
     * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
     * @author：陈清风扬，个人微信号：chenqingfengyangjj。
     * @方法描述：该方法会把客户端回复给Proxy的响应转发给Broker节点
     */
    @Override
    public CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> processGetConsumerRunningInfo(
            ProxyContext context, RemotingCommand command, GetConsumerRunningInfoRequestHeader header) {
        CompletableFuture<ProxyRelayResult<ConsumerRunningInfo>> future = new CompletableFuture<>();
        //注意，这里向future中注册了一个回调方法，该回调方法会在future被设置为完成状态后执行
        future.thenAccept(proxyOutResult -> {
            //在这里使用brokerController得到Broker的netty服务器对象
            RemotingServer remotingServer = this.brokerController.getRemotingServer();
            if (remotingServer instanceof NettyRemotingAbstract) {
                NettyRemotingAbstract nettyRemotingAbstract = (NettyRemotingAbstract) remotingServer;
                //创建一个响应对象
                RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(null);
                //注意，这里的command是Broker发送给proxy的请求对象，现在只是把请求Id设置到响应中了
                remotingCommand.setOpaque(command.getOpaque());
                //从客户端回复的响应结果中得到响应状态码，设置到响应对象中
                remotingCommand.setCode(proxyOutResult.getCode());
                //设置备注信息
                remotingCommand.setRemark(proxyOutResult.getRemark());
                //判断响应是否成功
                if (proxyOutResult.getCode() == ResponseCode.SUCCESS && proxyOutResult.getResult() != null) {
                    //如果成功则从客户端响应中得到消费者的运行信息
                    ConsumerRunningInfo consumerRunningInfo = proxyOutResult.getResult();
                    //把消费者运行信息设置到响应体中
                    remotingCommand.setBody(consumerRunningInfo.encode());
                }
                //创建封装客户端和proxy地址信息的channel对象，这个channel对象目前没什么用，非常简单，大家简单看一下即可
                SimpleChannel simpleChannel = new SimpleChannel(context.getRemoteAddress(), context.getLocalAddress());
                //把响应交给Broker服务端处理了
                nettyRemotingAbstract.processResponseCommand(simpleChannel.getChannelHandlerContext(), remotingCommand);
            }
        });
        //返回future对象
        return future;
    }


    /**
     * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
     * @author：陈清风扬，个人微信号：chenqingfengyangjj。
     * @方法描述：该方法和上面方法的逻辑基本一致，我就不重复添加注释了
     */
    @Override
    public CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> processConsumeMessageDirectly(
            ProxyContext context, RemotingCommand command,
            ConsumeMessageDirectlyResultRequestHeader header) {
        CompletableFuture<ProxyRelayResult<ConsumeMessageDirectlyResult>> future = new CompletableFuture<>();
        future.thenAccept(proxyOutResult -> {
            RemotingServer remotingServer = this.brokerController.getRemotingServer();
            if (remotingServer instanceof NettyRemotingAbstract) {
                NettyRemotingAbstract nettyRemotingAbstract = (NettyRemotingAbstract) remotingServer;
                RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(null);
                remotingCommand.setOpaque(command.getOpaque());
                remotingCommand.setCode(proxyOutResult.getCode());
                remotingCommand.setRemark(proxyOutResult.getRemark());
                if (proxyOutResult.getCode() == ResponseCode.SUCCESS && proxyOutResult.getResult() != null) {
                    ConsumeMessageDirectlyResult consumeMessageDirectlyResult = proxyOutResult.getResult();
                    remotingCommand.setBody(consumeMessageDirectlyResult.encode());
                }
                SimpleChannel simpleChannel = new SimpleChannel(context.getRemoteAddress(), context.getLocalAddress());
                nettyRemotingAbstract.processResponseCommand(simpleChannel.getChannelHandlerContext(), remotingCommand);
            }
        });
        return future;
    }
}
